package com.isyscore.os.flinksql.sql;

import com.isyscore.os.flinksql.LogPrint;
import com.isyscore.os.flinksql.model.SqlCommandCall;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableEnvironment;

import java.util.List;

/**
 * @author wany
 * 负责实际执行flink sql
 */
@Slf4j
public class SqlExecutor {

    /**
     * 根据SQL语句的类型执行SQL
     */
    public static void exeSql(List<SqlCommandCall> sqlCommandCallList, TableEnvironment tEnv, StatementSet statementSet) {
        for (SqlCommandCall sqlCommandCall : sqlCommandCallList) {
            switch (sqlCommandCall.sqlCommand) {
                //配置
                case SET:
                    setConfProp(tEnv, sqlCommandCall.operands[0], sqlCommandCall.operands[1]);
                    break;
                //insert 语句
                case INSERT_INTO:
                case INSERT_OVERWRITE:
                    LogPrint.logPrint(sqlCommandCall);
                    statementSet.addInsertSql(sqlCommandCall.operands[0]);
                    break;
                //显示语句
                case SELECT:
                case SHOW_CATALOGS:
                case SHOW_DATABASES:
                case SHOW_MODULES:
                case SHOW_TABLES:
                    LogPrint.queryRestPrint(tEnv, sqlCommandCall);
                    break;
                // 兼容sql-client.sh的用法，只显示但不执行
                case BEGIN_STATEMENT_SET:
                case END:
                    LogPrint.logPrint(sqlCommandCall);
                    break;
                default:
                    LogPrint.logPrint(sqlCommandCall);
                    tEnv.executeSql(sqlCommandCall.operands[0]);
                    break;
            }
        }
    }

    private static void setConfProp(TableEnvironment tEnv, String key, String value) {
        if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
            return;
        }
        Configuration configuration = tEnv.getConfig().getConfiguration();
        log.info("#############setConfiguration#############\n  key={} value={}", key, value);
        configuration.setString(key, value);
    }

}
